﻿using EventBus.Common;
using Microsoft.Extensions.Options;
using Polly;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
using System;
using System.Collections.Concurrent;
using System.Net.Sockets;

namespace EventBus.RabbitMQ
{
    public class ConnectionChannel : IConnectionChannel, IDisposable
    {
        private readonly RabbitMQOptions _options;
        private IConnection _connection;
        public ConnectionChannel(IOptions<RabbitMQOptions> options)
        {
            _options = options.Value;
        }

        /// <summary>
        /// 交换机名称
        /// </summary>
        public string ExchangeName => _options.ExchangeName;

        /// <summary>
        /// 获取Channel
        /// </summary>
        /// <returns></returns>
        public IModel GetChannel()
        {
            TryConnection();
            //if (_queueModel.Count > 0) 
            //{
            //    _queueModel.TryDequeue(out IModel channel);
            //    return channel;
            //}
            var newChannel = _connection.CreateModel();
            //if (newChannel != null) 
            //{
            //    _queueModel.Enqueue(newChannel);
            //}
            return newChannel;
        }

        /// <summary>
        /// 获取Channel
        /// </summary>
        /// <returns></returns>
        public IModel GetSubscriberChannel()
        {
            TryConnection();
            return _connection.CreateModel();
        }

        /// <summary>
        /// 创建连接
        /// </summary>
        /// <returns></returns>
        public IConnection GetConnection()
        {
            if (_connection != null && _connection.IsOpen)
            {
                return _connection;
            }
            var connectionFactory = new ConnectionFactory
            {
                UserName = _options.UserName,
                Port = _options.Port,
                Password = _options.Password,
                VirtualHost = _options.VirtualHost,
                HostName = _options.HostName
            };
            _connection = connectionFactory.CreateConnection();
            _connection.ConnectionShutdown += Connection_ConnectionShutdown;
            _connection.CallbackException += Connection_CallbackException;
            _connection.ConnectionBlocked += Connection_ConnectionBlocked;
            return _connection;
        }

        /// <summary>
        /// 重试连接
        /// </summary>
        /// <returns></returns>
        public void TryConnection()
        {
            var policy = Policy.Handle<SocketException>().Or<BrokerUnreachableException>()
                .WaitAndRetry(5, p => TimeSpan.FromSeconds(1), (ex, time) =>
                {
                    ////记录异常日志
                    //_loggerHelper.LogError("创建链接失败", ex.Message);
                });
            policy.Execute(() =>
            {
                GetConnection();
            });
        }

        /// <summary>
        /// 连接被阻止的异常
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void Connection_ConnectionBlocked(object sender, ConnectionBlockedEventArgs e)
        {
            TryConnection();
        }

        /// <summary>
        /// 回调异常
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void Connection_CallbackException(object sender, CallbackExceptionEventArgs e)
        {
            TryConnection();
        }

        /// <summary>
        ///  连接异常
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="e"></param>
        private void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e)
        {
            TryConnection();
        }

        /// <summary>
        /// 释放连接通到
        /// </summary>
        public void Dispose()
        {
            if (_connection.IsOpen)
            {
                _connection.Close();
            }
        }
    }
}
